home *** CD-ROM | disk | FTP | other *** search
/ PC World Komputer 2010 April / PCWorld0410.iso / hity wydania / Ubuntu 9.10 PL / karmelkowy-koliberek-9.10-netbook-remix-PL.iso / casper / filesystem.squashfs / usr / lib / python2.6 / multiprocessing / util.py < prev    next >
Text File  |  2009-11-02  |  8KB  |  292 lines

  1. #
  2. # Module providing various facilities to other parts of the package
  3. #
  4. # multiprocessing/util.py
  5. #
  6. # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
  7. #
  8.  
  9. import itertools
  10. import weakref
  11. import atexit
  12. import threading        # we want threading to install it's
  13.                         # cleanup function before multiprocessing does
  14.  
  15. from multiprocessing.process import current_process, active_children
  16.  
  17. __all__ = [
  18.     'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
  19.     'log_to_stderr', 'get_temp_dir', 'register_after_fork',
  20.     'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
  21.     'SUBDEBUG', 'SUBWARNING',
  22.     ]
  23.  
  24. #
  25. # Logging
  26. #
  27.  
  28. NOTSET = 0
  29. SUBDEBUG = 5
  30. DEBUG = 10
  31. INFO = 20
  32. SUBWARNING = 25
  33.  
  34. LOGGER_NAME = 'multiprocessing'
  35. DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
  36.  
  37. _logger = None
  38. _log_to_stderr = False
  39.  
  40. def sub_debug(msg, *args):
  41.     if _logger:
  42.         _logger.log(SUBDEBUG, msg, *args)
  43.  
  44. def debug(msg, *args):
  45.     if _logger:
  46.         _logger.log(DEBUG, msg, *args)
  47.  
  48. def info(msg, *args):
  49.     if _logger:
  50.         _logger.log(INFO, msg, *args)
  51.  
  52. def sub_warning(msg, *args):
  53.     if _logger:
  54.         _logger.log(SUBWARNING, msg, *args)
  55.  
  56. def get_logger():
  57.     '''
  58.     Returns logger used by multiprocessing
  59.     '''
  60.     global _logger
  61.     import logging, atexit
  62.  
  63.     logging._acquireLock()
  64.     try:
  65.         if not _logger:
  66.  
  67.             _logger = logging.getLogger(LOGGER_NAME)
  68.             _logger.propagate = 0
  69.             logging.addLevelName(SUBDEBUG, 'SUBDEBUG')
  70.             logging.addLevelName(SUBWARNING, 'SUBWARNING')
  71.  
  72.             # XXX multiprocessing should cleanup before logging
  73.             if hasattr(atexit, 'unregister'):
  74.                 atexit.unregister(_exit_function)
  75.                 atexit.register(_exit_function)
  76.             else:
  77.                 atexit._exithandlers.remove((_exit_function, (), {}))
  78.                 atexit._exithandlers.append((_exit_function, (), {}))
  79.  
  80.     finally:
  81.         logging._releaseLock()
  82.  
  83.     return _logger
  84.  
  85. def log_to_stderr(level=None):
  86.     '''
  87.     Turn on logging and add a handler which prints to stderr
  88.     '''
  89.     global _log_to_stderr
  90.     import logging
  91.  
  92.     logger = get_logger()
  93.     formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
  94.     handler = logging.StreamHandler()
  95.     handler.setFormatter(formatter)
  96.     logger.addHandler(handler)
  97.  
  98.     if level:
  99.         logger.setLevel(level)
  100.     _log_to_stderr = True
  101.     return _logger
  102.  
  103. #
  104. # Function returning a temp directory which will be removed on exit
  105. #
  106.  
  107. def get_temp_dir():
  108.     # get name of a temp directory which will be automatically cleaned up
  109.     if current_process()._tempdir is None:
  110.         import shutil, tempfile
  111.         tempdir = tempfile.mkdtemp(prefix='pymp-')
  112.         info('created temp directory %s', tempdir)
  113.         Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
  114.         current_process()._tempdir = tempdir
  115.     return current_process()._tempdir
  116.  
  117. #
  118. # Support for reinitialization of objects when bootstrapping a child process
  119. #
  120.  
  121. _afterfork_registry = weakref.WeakValueDictionary()
  122. _afterfork_counter = itertools.count()
  123.  
  124. def _run_after_forkers():
  125.     items = list(_afterfork_registry.items())
  126.     items.sort()
  127.     for (index, ident, func), obj in items:
  128.         try:
  129.             func(obj)
  130.         except Exception, e:
  131.             info('after forker raised exception %s', e)
  132.  
  133. def register_after_fork(obj, func):
  134.     _afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj
  135.  
  136. #
  137. # Finalization using weakrefs
  138. #
  139.  
  140. _finalizer_registry = {}
  141. _finalizer_counter = itertools.count()
  142.  
  143.  
  144. class Finalize(object):
  145.     '''
  146.     Class which supports object finalization using weakrefs
  147.     '''
  148.     def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
  149.         assert exitpriority is None or type(exitpriority) is int
  150.  
  151.         if obj is not None:
  152.             self._weakref = weakref.ref(obj, self)
  153.         else:
  154.             assert exitpriority is not None
  155.  
  156.         self._callback = callback
  157.         self._args = args
  158.         self._kwargs = kwargs or {}
  159.         self._key = (exitpriority, _finalizer_counter.next())
  160.  
  161.         _finalizer_registry[self._key] = self
  162.  
  163.     def __call__(self, wr=None):
  164.         '''
  165.         Run the callback unless it has already been called or cancelled
  166.         '''
  167.         try:
  168.             del _finalizer_registry[self._key]
  169.         except KeyError:
  170.             sub_debug('finalizer no longer registered')
  171.         else:
  172.             sub_debug('finalizer calling %s with args %s and kwargs %s',
  173.                      self._callback, self._args, self._kwargs)
  174.             res = self._callback(*self._args, **self._kwargs)
  175.             self._weakref = self._callback = self._args = \
  176.                             self._kwargs = self._key = None
  177.             return res
  178.  
  179.     def cancel(self):
  180.         '''
  181.         Cancel finalization of the object
  182.         '''
  183.         try:
  184.             del _finalizer_registry[self._key]
  185.         except KeyError:
  186.             pass
  187.         else:
  188.             self._weakref = self._callback = self._args = \
  189.                             self._kwargs = self._key = None
  190.  
  191.     def still_active(self):
  192.         '''
  193.         Return whether this finalizer is still waiting to invoke callback
  194.         '''
  195.         return self._key in _finalizer_registry
  196.  
  197.     def __repr__(self):
  198.         try:
  199.             obj = self._weakref()
  200.         except (AttributeError, TypeError):
  201.             obj = None
  202.  
  203.         if obj is None:
  204.             return '<Finalize object, dead>'
  205.  
  206.         x = '<Finalize object, callback=%s' % \
  207.             getattr(self._callback, '__name__', self._callback)
  208.         if self._args:
  209.             x += ', args=' + str(self._args)
  210.         if self._kwargs:
  211.             x += ', kwargs=' + str(self._kwargs)
  212.         if self._key[0] is not None:
  213.             x += ', exitprority=' + str(self._key[0])
  214.         return x + '>'
  215.  
  216.  
  217. def _run_finalizers(minpriority=None):
  218.     '''
  219.     Run all finalizers whose exit priority is not None and at least minpriority
  220.  
  221.     Finalizers with highest priority are called first; finalizers with
  222.     the same priority will be called in reverse order of creation.
  223.     '''
  224.     if minpriority is None:
  225.         f = lambda p : p[0][0] is not None
  226.     else:
  227.         f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
  228.  
  229.     items = [x for x in _finalizer_registry.items() if f(x)]
  230.     items.sort(reverse=True)
  231.  
  232.     for key, finalizer in items:
  233.         sub_debug('calling %s', finalizer)
  234.         try:
  235.             finalizer()
  236.         except Exception:
  237.             import traceback
  238.             traceback.print_exc()
  239.  
  240.     if minpriority is None:
  241.         _finalizer_registry.clear()
  242.  
  243. #
  244. # Clean up on exit
  245. #
  246.  
  247. def is_exiting():
  248.     '''
  249.     Returns true if the process is shutting down
  250.     '''
  251.     return _exiting or _exiting is None
  252.  
  253. _exiting = False
  254.  
  255. def _exit_function():
  256.     global _exiting
  257.  
  258.     info('process shutting down')
  259.     debug('running all "atexit" finalizers with priority >= 0')
  260.     _run_finalizers(0)
  261.  
  262.     for p in active_children():
  263.         if p._daemonic:
  264.             info('calling terminate() for daemon %s', p.name)
  265.             p._popen.terminate()
  266.  
  267.     for p in active_children():
  268.         info('calling join() for process %s', p.name)
  269.         p.join()
  270.  
  271.     debug('running the remaining "atexit" finalizers')
  272.     _run_finalizers()
  273.  
  274. atexit.register(_exit_function)
  275.  
  276. #
  277. # Some fork aware types
  278. #
  279.  
  280. class ForkAwareThreadLock(object):
  281.     def __init__(self):
  282.         self._lock = threading.Lock()
  283.         self.acquire = self._lock.acquire
  284.         self.release = self._lock.release
  285.         register_after_fork(self, ForkAwareThreadLock.__init__)
  286.  
  287. class ForkAwareLocal(threading.local):
  288.     def __init__(self):
  289.         register_after_fork(self, lambda obj : obj.__dict__.clear())
  290.     def __reduce__(self):
  291.         return type(self), ()
  292.